library(tidyverse)
## Warning: package 'tidyverse' was built under R version 3.5.3
## -- Attaching packages ----------------------------------------------------------------------------------------------------------------------------------- tidyverse 1.2.1 --
## v ggplot2 3.1.0     v purrr   0.2.5
## v tibble  1.4.2     v dplyr   0.7.8
## v tidyr   0.8.2     v stringr 1.3.1
## v readr   1.3.1     v forcats 0.3.0
## Warning: package 'readr' was built under R version 3.5.2
## Warning: package 'forcats' was built under R version 3.5.2
## -- Conflicts -------------------------------------------------------------------------------------------------------------------------------------- tidyverse_conflicts() --
## x dplyr::filter() masks stats::filter()
## x dplyr::lag()    masks stats::lag()
library(DT)
options = params$options
read.clean.files = function(filename){
  file = read.csv(filename, header = FALSE)
  column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken")
  colnames(file) = column.names
  return(file)
}

files = list.files(path = "../Results/", pattern = ".csv$", recursive = TRUE, full.names = TRUE) # List all .csv files
#files

databricks.files = files[grepl("Databricks",files)]
local.vm..files = files[grepl("Local_VM",files)]

rows.databricks = lapply(databricks.files, read.csv, header = FALSE) # Read the files into list
merged.databricks = do.call(rbind, rows.databricks) # combine the data.frame
merged.databricks$Setup = 'Databricks'

rows.local.vm = lapply(local.vm..files, read.csv, header = FALSE) # Read the files into list
merged.local.vm = do.call(rbind, rows.local.vm) # combine the data.frame
merged.local.vm$Setup = 'Local VM'

merged_data = rbind(merged.databricks,merged.local.vm)
merged_data$Setup = as.factor(merged_data$Setup)

column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken","Setup")
colnames(merged_data) = column.names
merged_data$Type = as.factor(gsub(pattern = "Operations", replacement = "Operation", x = merged_data$Type))
merged_data = merged_data %>% filter(RunID != 1)

# Convert columns to factors
merged_data$MachineID = as.factor(merged_data$MachineID)
merged_data$Randomize = as.factor(merged_data$Randomize)
merged_data$RunID = as.factor(merged_data$RunID)

merged_data$Dataset = sub("dataset_", "", merged_data$Dataset) 
merged_data$Dataset = sub("MB$", "", merged_data$Dataset) 
merged_data$Dataset = as.factor(merged_data$Dataset)

str(merged_data)
## 'data.frame':    5177 obs. of  9 variables:
##  $ Language : Factor w/ 2 levels "PySpark","Scala": 1 1 1 1 1 1 1 1 1 1 ...
##  $ Randomize: Factor w/ 1 level "1": 1 1 1 1 1 1 1 1 1 1 ...
##  $ Dataset  : Factor w/ 5 levels "10","100","200",..: 2 2 2 2 2 2 2 2 2 2 ...
##  $ MachineID: Factor w/ 2 levels "1","2": 1 1 1 1 1 1 1 1 1 1 ...
##  $ RunID    : Factor w/ 5 levels "2","3","4","5",..: 1 1 1 1 1 1 1 1 1 1 ...
##  $ Type     : Factor w/ 4 levels "Aggregate Operation",..: 2 4 3 2 3 2 3 2 2 1 ...
##  $ Operation: Factor w/ 37 levels " Filter"," Filter Reg Ex 1",..: 4 21 18 10 19 28 17 24 29 7 ...
##  $ TimeTaken: num  15.44 7.04 11.72 16.64 11.28 ...
##  $ Setup    : Factor w/ 2 levels "Databricks","Local VM": 1 1 1 1 1 1 1 1 1 1 ...
head(merged_data)
##   Language Randomize Dataset MachineID RunID             Type
## 1  PySpark         1     100         1     2 Column Operation
## 2  PySpark         1     100         1     2    Row Operation
## 3  PySpark         1     100         1     2  Mixed Operation
## 4  PySpark         1     100         1     2 Column Operation
## 5  PySpark         1     100         1     2  Mixed Operation
## 6  PySpark         1     100         1     2 Column Operation
##                     Operation TimeTaken      Setup
## 1  Full Outer Join 10 Columns 15.442877 Databricks
## 2                 Running Sum  7.039728 Databricks
## 3  Pivot 10 Rows and 1 Column 11.719025 Databricks
## 4        Inner Join 5 Columns 16.637699 Databricks
## 5   Pivot 5 Rows and 1 Column 11.277677 Databricks
## 6       Sorting Desc 5 column 10.368395 Databricks
summary(merged_data)
##     Language    Randomize Dataset    MachineID RunID   
##  PySpark:2267   1:5177    10 :1200   1:2762    2:1173  
##  Scala  :2910             100:1290   2:2415    3:1173  
##                           200: 945             4:1038  
##                           300: 932             5: 903  
##                           500: 810             6: 890  
##                                                        
##                                                        
##                   Type                            Operation   
##  Aggregate Operation: 698    GroupBy 10 columns        : 175  
##  Column Operation   :2559    Merge 2 columns into 1    : 175  
##  Mixed Operation    : 524    Merge 5 columns into 1    : 175  
##  Row Operation      :1396    Pivot 1 Rows and 1 Column : 175  
##                              Pivot 10 Rows and 1 Column: 175  
##                              Ranking by Group          : 175  
##                             (Other)                    :4127  
##    TimeTaken              Setup     
##  Min.   :  0.237   Databricks:2972  
##  1st Qu.:  3.792   Local VM  :2205  
##  Median : 11.450                    
##  Mean   : 25.430                    
##  3rd Qu.: 27.981                    
##  Max.   :353.195                    
## 
size_10MB =  11.4789848327637 # file.size("../../Data/Databricks/machine2/dataset_10MB.csv")/(1024*1024)
size_100MB = 115.640992164612 # file.size("../../Data/Databricks/machine2/dataset_100MB.csv")/(1024*1024) 
size_200MB = 229.8573  
size_300MB = 343.2709
size_500MB = 576.678165435791 # file.size("../../Data/Databricks/machine2/dataset_500MB.csv")/(1024*1024) 

print(paste("Actual Size of 10MB file (in MB)",size_10MB))
## [1] "Actual Size of 10MB file (in MB) 11.4789848327637"
print(paste("Actual Size of 100MB file (in MB)",size_100MB))
## [1] "Actual Size of 100MB file (in MB) 115.640992164612"
print(paste("Actual Size of 200MB file (in MB)",size_200MB))
## [1] "Actual Size of 200MB file (in MB) 229.8573"
print(paste("Actual Size of 300MB file (in MB)",size_300MB))
## [1] "Actual Size of 300MB file (in MB) 343.2709"
print(paste("Actual Size of 500MB file (in MB)",size_500MB))
## [1] "Actual Size of 500MB file (in MB) 576.678165435791"
size_info = data.frame(Dataset = c("10","100","200","300","500")
                       ,Size = c(size_10MB,size_100MB,size_200MB,size_300MB,size_500MB))
str(size_info)
## 'data.frame':    5 obs. of  2 variables:
##  $ Dataset: Factor w/ 5 levels "10","100","200",..: 1 2 3 4 5
##  $ Size   : num  11.5 115.6 229.9 343.3 576.7
merged_data = merged_data %>%
  merge(size_info) %>%
  mutate(Throughput = Size/TimeTaken)

Common Functions

summarize_results = function(grouped_data){
  rv = grouped_data %>%
    summarise(n = n()
            ,Mean_Time = round(mean(TimeTaken),2)
            ,Std_Dev_Time= round(sd(TimeTaken),2)
            ,Coeff_Var_Time = round(Mean_Time/Std_Dev_Time,2)
            ,Mean_Throughput = round(mean(Throughput),2)
            ,Std_Dev_Throughput= round(sd(Throughput),2)
            ,Coeff_Var_Throughput = round(Mean_Throughput/Std_Dev_Throughput,2)
            )
  return(rv)  
}


plot_hist = function(grouped_data, by_var){
  indices = grouped_data %>%
    dplyr::group_indices() %>%
    as.factor()

  grouped_data$Index = as.factor(indices)
  facet_form = as.formula(paste( "~" , paste(grouped_data %>% dplyr::group_vars(), collapse = " + "), sep = ""))
  
  print(ggplot(grouped_data, aes_string(x = "Index", y = "TimeTaken", fill=by_var)) + 
    geom_boxplot() + 
    #facet_wrap(Index ~ .  , scales = 'free',ncol=4, labeller = label_both))
    facet_wrap(facet_form  , scales = 'free',ncol=4, labeller = label_both))
  
  return(grouped_data)
}

databricks_vs_localVM = function(arData) {
  result = arData %>% 
    group_by(Type, Operation, Language, MachineID, Dataset, Setup) %>%
    summarize_results()
  
  group = arData %>% 
    group_by(Type, Operation, Language, MachineID, Dataset)
  plot_hist(grouped_data = group, by_var = "Setup")
  
  return (result)
}

PySpark_vs_Scala = function(arData, arOpt=2) {
  # 1 will only return table
  # 2 will only plot histograms
  # 0 will do both
  result = NA
  
  if (arOpt == 1 | arOpt == 0){
    result = arData %>% 
      group_by(Type, Operation, Dataset, MachineID, Setup, Language) %>%
      summarize_results()
  }
  
  if (arOpt == 2 | arOpt == 0){
    group = arData %>% 
      group_by(Type, Operation, Dataset, MachineID, Setup)
    plot_hist(grouped_data = group, by_var = "Language")
  }
  
  return (result)
}

PySpark vs. Scala

Row Operations

if (options == 1){
  filtered = merged_data %>%
    filter(Type == "Row Operation") %>%
    PySpark_vs_Scala(arOpt = 2)
}

Column Operations

if (options == 2){
  filtered = merged_data %>%
    filter(Type == "Column Operation") %>%
    PySpark_vs_Scala(arOpt = 2)
}

Aggregare and MiColumn Operations

if (options == 3){
  filtered = merged_data %>%
    filter(Type == "Aggregate Operation") %>%
    PySpark_vs_Scala(arOpt = 2)
  
  filtered = merged_data %>%
    filter(Type == "Mixed Operation") %>%
    PySpark_vs_Scala(arOpt = 2)
}